from ._kafka import Producer, Consumer, KafkaAdmin

from config import settings
from items import RequestItem

kafka_producer = Producer(settings.kafka.servers)
kafka_consumer = Consumer(settings.kafka.servers)
kafka_admin = KafkaAdmin(settings.kafka.servers)


class Kafka:
    producer = kafka_producer
    consumer = kafka_consumer
    admin = kafka_admin

    @classmethod
    def send_req_item(cls, item: RequestItem):
        if not isinstance(item, RequestItem):
            raise ValueError("[item]类型错误")
        Kafka.producer.send(item.dict(), 'key', settings.kafka.consumer.topic)
